package com.shockweb.rpc;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.Random;

import com.shockweb.bridge.Service;
import com.shockweb.bridge.ServiceRequest;
import com.shockweb.bridge.ServiceResult;
import com.shockweb.bridge.ServiceSpace;
import com.shockweb.client.exception.ClientException;
import com.shockweb.client.exception.ServerException;
import com.shockweb.client.exception.TimeoutException;
import com.shockweb.client.impl.RegisterClient;
import com.shockweb.client.impl.ServiceClient;
import com.shockweb.common.log.LogManager;
import com.shockweb.rpc.config.ClientConfig;
import com.shockweb.rpc.data.ServiceClientWapper;
import com.shockweb.rpc.data.ServiceRoot;

/**
 * 远程客户端管理，支持单例和多例模式
 * 
 * @author 彭明华
 * 2018年1月8日 创建
 */
public class ClientManager {

	/**
	 * 保存所有连接微服务的连接
	 */
	private Map<String,ServiceClientWapper> clients = new HashMap<String,ServiceClientWapper>();

	/**
	 * 连接到注册服务器客户端
	 */
	private RegisterClient registerClient = null;
	
	/**
	 * 获取注册服务器连接时间
	 */
	private long registerClientTime = System.currentTimeMillis();

	/**
	 * 连接到注册服务器客户端
	 * @return
	 */
	public RegisterClient getRegisterClient()throws ClientException{
		registerClientTime = System.currentTimeMillis();
		if(registerClient==null || !registerClient.isActive()){
			nextRegisterClient();
		}
		return registerClient;
	}
	
	/**
	 * 所有注册服务器地址
	 */
	private String[] registerServers = null;
	
	/**
	 * 当前注册服务器所有服务信息
	 */
	private ServiceRoot serviceRoot;
	
	/**
	 * 获取下一个可用的RegisterClient
	 */
	void nextRegisterClient()throws ClientException{
		try{
			serviceRoot.getLock().readLock().lock();
			if(registerServers!=null){
				int index = new Random().nextInt(registerServers.length);
		    	registerClient = new RegisterClient(config.getClientTimeOut(),config.getClientConnectTimeOut(),
	    			config.getClientSleepTime(),config.getClientIdleStateTime());
				int count = 0;
				while(!registerClient.isActive() && count<registerServers.length) {
					try{
						registerClient.connect(registerServers[index]);
					} catch (ClientException e) {
						LogManager.warn(this.getClass(),"连接注册服务器失败 Server=" + registerServers[index]);
					}
					count ++;
					index ++;
					if(index>=registerServers.length){
						index -= registerServers.length;
					}
				}
			}
		}finally{
			serviceRoot.getLock().readLock().unlock();
		}
		if(registerClient==null || !registerClient.isActive()){
			throw new ClientException("连接所有注册服务器异常");
		}
	}
	
	/**
	 * 注册服务器的配置
	 */
	private ClientConfig config = null;
	
	/**
	 * 返回配置
	 * @return
	 */
	public ClientConfig getConfig(){
		return config;
	}
	
	/**
	 * 微服务同步守护线程
	 */
	private SyncThread syncThread;
	
	/**
	 * 构造方法
	 */
	public ClientManager(ClientConfig config)throws ClientException{
		serviceRoot = new ServiceRoot();
		this.config = config;
		if(config.getRegisterServerUrls()!=null){
			registerServers = config.getRegisterServerUrls().split(",");
			nextRegisterClient();
		}
		syncThread = new SyncThread(this,serviceRoot);
	}

	

	/**
	 * 初始化客户端，简略参数
	 * @param registerServerUrls
	 * @throws ClientException
	 */
	public ClientManager(String registerServerUrls)throws ClientException{
		serviceRoot = new ServiceRoot();
		config = new ClientConfig();
		config.setRegisterServerUrls(registerServerUrls);
		if(config.getRegisterServerUrls()!=null){
			registerServers = config.getRegisterServerUrls().split(",");
			nextRegisterClient();
		}
		syncThread = new SyncThread(this,serviceRoot);
	}
	
	
	/**
	 * 刷新清楚无效连接和关闭长时间不用的连接
	 */
	public void refresh(){
		synchronized(this){
			if(registerClient!=null){
				if(registerClient.isActive()){
					if(registerClientTime+config.getActiveTime()<System.currentTimeMillis()){
						try{
							registerClient.close();
						}catch(Throwable e){
							LogManager.errorLog(this.getClass(), "关闭注册服务器连接失败 Server=" + registerClient.getHostUrl(), e);
						}
					}
				}
			}
		}
		synchronized(clients){
			Iterator<Entry<String,ServiceClientWapper>> its = clients.entrySet().iterator();
			List<String> lists = new ArrayList<String>();
			while(its.hasNext()){
				Entry<String,ServiceClientWapper> entry = its.next();
				ServiceClientWapper wapper = entry.getValue();
				if(wapper.isActive()){
					if(wapper.getTimeMillis()+config.getActiveTime()<System.currentTimeMillis()){
						try{
							wapper.close();
						}catch(Throwable e){
							LogManager.errorLog(this.getClass(), "关闭微服务器连接失败 Server=" + entry.getKey(), e);
						}
						lists.add(entry.getKey());
					}
				}else{
					lists.add(entry.getKey());
				}
			}
			for(String url:lists){
				clients.remove(url);
			}
		}
	}

	/**
	 * 获取注册中心spaceName命名空间的serviceName微服务连接
	 * @param spaceName
	 * @param serviceName
	 * @return
	 * @throws ClientException
	 */
	public ServiceClient getClient(String spaceName,String serviceName)throws ClientException{
		List<String> lists = new ArrayList<String>();
		try{
			serviceRoot.getLock().readLock().lock();
			ServiceSpace serviceSpace = serviceRoot.getServiceSpaces().get(spaceName);
			if(serviceSpace!=null){
				Service service = serviceSpace.getService(serviceName);
				if(service!=null){
					lists.addAll(service.getServiceHosts());
				}
			}
		}finally{
			serviceRoot.getLock().readLock().unlock();
		}
		if(lists==null || lists.isEmpty()){
			throw new ClientException("找不到微服务'" + spaceName + "." + serviceName + "'对应的服务器");
		}
		ClientException exception = null;
		ServiceClientWapper clientWapper = null;
		while(clientWapper==null && !lists.isEmpty()){
			int index = new Random().nextInt(lists.size());
			String host = lists.get(index);
			lists.remove(index);
			ServiceClientWapper wapper = null;
			synchronized(this){
				if(!isFuse(host)) {
					wapper = clients.get(host);
					if(wapper!=null){
						if(wapper.isActive()){
							clientWapper = wapper;
						}else{
							clients.remove(host);
						}
					}
					if(clientWapper==null){
						ServiceClient client = new ServiceClient(this,config.getClientTimeOut(),config.getClientConnectTimeOut(),
								config.getClientSleepTime(),config.getClientIdleStateTime());
						try {
							client.connect(host);
						} catch (ClientException e) {
							exception = e;
						}
						if(client.isActive()){
							clientWapper = new ServiceClientWapper(client);
							clients.put(host, clientWapper);
						}
					}
				}
			}
			index++;
			if(clientWapper!=null){
				return clientWapper.getServiceClient();
			}
		}
		throw exception;
	}
	
	
	/**
	 * 获取注册中心spaceName命名空间的serviceName微服务所有有效连接
	 * @param spaceName
	 * @param serviceName
	 * @return
	 * @throws ClientException
	 */
	public List<ServiceClient> getAllClients(String spaceName,String serviceName)throws ClientException{
		List<String> lists = new ArrayList<String>();
		try{
			serviceRoot.getLock().readLock().lock();
			ServiceSpace serviceSpace = serviceRoot.getServiceSpaces().get(spaceName);
			if(serviceSpace!=null){
				Service service = serviceSpace.getService(serviceName);
				if(service!=null){
					lists.addAll(service.getServiceHosts());
				}
			}
		}finally{
			serviceRoot.getLock().readLock().unlock();
		}
		if(lists==null || lists.isEmpty()){
			throw new ClientException("找不到微服务'" + spaceName + "." + serviceName + "'对应的服务器");
		}
		ClientException exception = null;
		List<ServiceClient> cs = new ArrayList<ServiceClient>();
		for(String host:lists){
			ServiceClientWapper wapper = null;
			synchronized(this){
				if(!isFuse(host)) {
					wapper = clients.get(host);
					if(wapper!=null){
						if(wapper.isActive()){
							cs.add(wapper.getServiceClient());
						}else{
							clients.remove(host);
						}
					}
					if(wapper==null){
						ServiceClient client = new ServiceClient(this,config.getClientTimeOut(),config.getClientConnectTimeOut(),
								config.getClientSleepTime(),config.getClientIdleStateTime());
						try {
							client.connect(host);
						} catch (ClientException e) {
							exception = e;
						}
						if(client.isActive()){
							wapper = new ServiceClientWapper(client);
							clients.put(host, wapper);
							cs.add(client);
						}
					}
				}
			}
		}
		if(cs.isEmpty()){
			throw exception;
		}else{
			return cs;
		}
	}
	

    /**
     * 调异步用spaceName命名空间的request所有服务器的微服务
     * @param request
     * @return
     * @throws ClientException
     */
    public List<Object> asyncRpcAllService(ServiceRequest request) throws ClientException {
    	List<Object> results = new ArrayList<>();
    	List<ServiceClient> clients = getAllClients(request.getSpaceName(),request.getService());
    	if(clients==null || clients.isEmpty()){
    		throw new ClientException("找不到" + request.getSpaceName() + "." + request.getService() + "'对应的服务器或服务");
    	}
    	for(ServiceClient client:clients){
   	    	ServiceResult result = null;
   	    	int i = 0;
   	    	while(true) {
   	        	try {
   	        		i++;
   	        		client.asyncRpcService(request);
   	        		result = null;
   	        		break;
   	        	}catch(TimeoutException | ServerException e) {
   	        		if(config.getClientMaxAutoRetries()<i) {
   	        			results.add(e);
   	        			break;
   	        		}
   	        	}catch(ClientException e) {
   	        		results.add(e);
   	        		break;
   	        	}
   	    	}
   	    	results.add(result);
    	}
    	return results;
    }
    
    /**
     * 调用spaceName命名空间的request所有服务器的微服务
     * @param request
     * @return
     * @throws ClientException
     */
    public List<Object> rpcAllService(ServiceRequest request) throws ClientException {
    	List<Object> results = new ArrayList<>();
    	List<ServiceClient> clients = getAllClients(request.getSpaceName(),request.getService());
    	if(clients==null || clients.isEmpty()){
    		throw new ClientException("找不到" + request.getSpaceName() + "." + request.getService() + "'对应的服务器或服务");
    	}
    	for(ServiceClient client:clients){
   	    	ServiceResult result = null;
   	    	int i = 0;
   	    	while(true) {
   	        	try {
   	        		i++;
   	        		result = client.rpcService(request);
   	        		break;
   	        	}catch(TimeoutException | ServerException e) {
   	        		if(config.getClientMaxAutoRetries()<i) {
   	        			results.add(e);
   	        			break;
   	        		}
   	        	}catch(ClientException e) {
   	        		results.add(e);
   	        		break;
   	        	}
   	    	}
   	    	results.add(result);
    	}
    	return results;
    }
    
    /**
     * 异步调用spaceName命名空间的request微服务
     * @param request
     * @return
     * @throws ClientException
     */
    public void asyncRpcService(ServiceRequest request) throws ClientException {
    	ServiceClient client = getClient(request.getSpaceName(), request.getService());
    	if(client==null){
    		throw new ClientException("找不到" + request.getSpaceName() + "." + request.getService() + "'对应的服务器或服务");
    	}
    	int i = 0;
    	while(true) {
        	try {
        		i++;
        		client.asyncRpcService(request);
        		break;
        	}catch(TimeoutException | ServerException e) {
        		if(config.getClientMaxAutoRetries()<i) {
        			throw e;
        		}
        	}catch(ClientException e) {
        		throw e;
        	}
    	}
    }
    /**
     * 调用spaceName命名空间的request微服务
     * @param request
     * @return
     * @throws ClientException
     */
    public ServiceResult rpcService(ServiceRequest request) throws ClientException {
    	ServiceClient client = getClient(request.getSpaceName(), request.getService());
    	if(client==null){
    		throw new ClientException("找不到" + request.getSpaceName() + "." + request.getService() + "'对应的服务器或服务");
    	}
    	ServiceResult result = null;
    	int i = 0;
    	while(true) {
        	try {
        		i++;
        		result = client.rpcService(request);
        		break;
        	}catch(TimeoutException | ServerException e) {
        		if(config.getClientMaxAutoRetries()<i) {
        			throw e;
        		}
        	}catch(ClientException e) {
        		throw e;
        	}
    	}
    	return result;
    }
    
    
    /**
     * 向spaceName命名空间的serviceName微服务的所有服务器method方法发送数据
     * @param spaceName
     * @param serviceName
     * @param method
     * @param data
     * @return
     * @throws ClientException
     */
    public List<Object> rpcAllService(String spaceName,String serviceName,String method,byte[] data) throws ClientException {
    	List<Object> results = new ArrayList<Object>();
    	List<ServiceClient> clients = getAllClients(spaceName,serviceName);
    	for(ServiceClient client:clients){
    		byte[] result = null;
    		
        	int i = 0;
        	while(true) {
            	try {
            		i++;
            		result = client.rpcService(serviceName,method,data);
            		break;
            	}catch(TimeoutException | ServerException e) {
            		if(config.getClientMaxAutoRetries()<i-1) {
            			results.add(e);
            			break;
            		}
            	}catch(ClientException e) {
            		results.add(e);
            		break;
            	}
        	}
    		results.add(result);
    	}
    	return results;
    }
    
    /**
     * 向spaceName命名空间的serviceName微服务method方法发送数据
     * @param spaceName
     * @param serviceName
     * @param method
     * @param data
     * @return
     * @throws ClientException
     */
    public byte[] rpcService(String spaceName,String serviceName,String method,byte[] data) throws ClientException {
    	ServiceClient client = getClient(spaceName, serviceName);
    	if(client==null){
    		throw new ClientException("找不到" + spaceName + "." + serviceName + "'对应的服务器或服务");
    	}
    	byte[] result = null;
    	int i = 0;
    	while(true) {
        	try {
        		i++;
        		result = client.rpcService(serviceName,method,data);
        		break;
        	}catch(TimeoutException | ServerException e) {
        		if(config.getClientMaxAutoRetries()<i-1) {
        			throw e;
        		}
        	}catch(ClientException e) {
        		throw e;
        	}
    	}
    	return result;
    }
	
	/**
	 * 关闭RPC远程连接管理异常
	 */
	public void close(){
		try {
			Thread.sleep(100);
		} catch (InterruptedException e1) {
		}
		synchronized(this){
			syncThread.close();
			try{
				registerClient.close();
			}catch(Exception e){
				LogManager.errorLog(this.getClass(),"关闭注册服务器连接失败", e);
			}
		}
		synchronized(this){
			Iterator<Entry<String,ServiceClientWapper>> its = clients.entrySet().iterator();
			while(its.hasNext()){
				Entry<String,ServiceClientWapper> entry = its.next();
				try{
					entry.getValue().close();
				}catch(Exception e){
					LogManager.errorLog(this.getClass(),"关闭微服务器连接失败", e);
				}
			}
		}
	}
	
	Map<String,Fuse> fuses = new HashMap<>();
	
    private final ReadWriteLock invokeLock = new ReentrantReadWriteLock();

    /**
     * 是否熔断
     * @param hostUrl
     * @return
     */
    private boolean isFuse(String hostUrl) {
    	if(config.getClientFuseCycleTime()>0 && config.getClientFuseCycleTimeNum()>0) {
			Fuse fuse = null;
	        try {
	        	invokeLock.readLock().lock();
	        	fuse = fuses.get(hostUrl);
	        }finally {
	        	invokeLock.readLock().unlock();
	        }
	    	if(fuse!=null){
	    		return fuse.isFuse();
	    	}else {
	    		return false;
	    	}
    	}else {
    		return false;
    	}
    }
	
    /**
     * 错误加1
     * @param client
     */
    public void addInvokeErrorCount(ServiceClient client) {
    	if(config.getClientFuseCycleTime()>0 && config.getClientFuseCycleTimeNum()>0) {
			Fuse fuse = null;
	        try {
	        	invokeLock.writeLock().lock();
	        	fuse = fuses.get(client.getHostUrl());
	        	if(fuse==null) {
	        		fuse = new Fuse(config.getClientFuseCycleTime(),config.getClientFuseCycleTimeNum(),
	        				config.getClientFuseWaitInterval(),config.getClientFuseErrorThreshold(),
	        				config.getClientFuseErrorPercentage());
	        		fuses.put(client.getHostUrl(), fuse);
	        	}
	        }finally {
	        	invokeLock.writeLock().unlock();
	        }
	        fuse.addInvokeErrorCount();
    	}
    }
    
    /**
     * 调用次数加1
     * @param client
     */
    public void addInvokeCount(ServiceClient client) {
    	if(config.getClientFuseCycleTime()>0 && config.getClientFuseCycleTimeNum()>0) {
			Fuse fuse = null;
	        try {
	        	invokeLock.writeLock().lock();
	        	fuse = fuses.get(client.getHostUrl());
	        	if(fuse==null) {
	        		fuse = new Fuse(config.getClientFuseCycleTime(),config.getClientFuseCycleTimeNum(),
	        				config.getClientFuseWaitInterval(),config.getClientFuseErrorThreshold(),
	        				config.getClientFuseErrorPercentage());
	        		fuses.put(client.getHostUrl(), fuse);
	        	}
	        }finally {
	        	invokeLock.writeLock().unlock();
	        }
	        fuse.addInvokeCount();
    	}
    }
}
